{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "id": "a1d65d37-6a53-4473-beae-4c7d8a2ea3fa",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2022-07-06T11:50:51.209651Z",
     "iopub.status.busy": "2022-07-06T11:50:51.209343Z",
     "iopub.status.idle": "2022-07-06T11:50:55.053965Z",
     "shell.execute_reply": "2022-07-06T11:50:55.053200Z",
     "shell.execute_reply.started": "2022-07-06T11:50:51.209580Z"
    },
    "tags": []
   },
   "outputs": [
    {
     "data": {
      "text/plain": [
       "ClientContext(dashboard_url='127.0.0.1:8265', python_version='3.7.10', ray_version='1.13.0', ray_commit='e4ce38d001dbbe09cd21c497fedd03d692b2be3e', protocol_version='2022-03-16', _num_clients=4, _context_to_restore=<ray.util.client._ClientContext object at 0x7f5efc2c6710>)"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "import ray\n",
    "import pyarrow as pa\n",
    "\n",
    "ray.shutdown()\n",
    "ray.init(address=\"ray://localhost:10001\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "id": "390b8312-fbd8-462f-9534-dd48e5173447",
   "metadata": {
    "execution": {
     "iopub.execute_input": "2022-07-06T11:50:56.517276Z",
     "iopub.status.busy": "2022-07-06T11:50:56.516903Z",
     "iopub.status.idle": "2022-07-06T11:56:21.107443Z",
     "shell.execute_reply": "2022-07-06T11:56:21.106101Z",
     "shell.execute_reply.started": "2022-07-06T11:50:56.517236Z"
    },
    "tags": []
   },
   "outputs": [
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:00<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:01<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:01<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:01<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:01<?, ?it/s]\n",
      "Metadata Fetch Progress:   0%|          | 0/71 [00:01<?, ?it/s]\n",
      "Metadata Fetch Progress:   3%|▎         | 2/71 [00:01<00:04, 14.42it/s]\n",
      "Metadata Fetch Progress:   4%|▍         | 3/71 [00:01<00:07,  9.62it/s]\n",
      "Metadata Fetch Progress:   6%|▌         | 4/71 [00:01<00:08,  8.17it/s]\n",
      "Metadata Fetch Progress:   8%|▊         | 6/71 [00:02<00:06, 10.32it/s]\n",
      "Metadata Fetch Progress:  14%|█▍        | 10/71 [00:02<00:04, 14.78it/s]\n",
      "Metadata Fetch Progress:  17%|█▋        | 12/71 [00:02<00:04, 13.53it/s]\n",
      "Metadata Fetch Progress:  21%|██        | 15/71 [00:02<00:03, 17.03it/s]\n",
      "Metadata Fetch Progress:  27%|██▋       | 19/71 [00:02<00:02, 22.32it/s]\n",
      "Metadata Fetch Progress:  31%|███       | 22/71 [00:02<00:02, 22.79it/s]\n",
      "Metadata Fetch Progress:  38%|███▊      | 27/71 [00:02<00:01, 26.41it/s]\n",
      "Metadata Fetch Progress:  46%|████▋     | 33/71 [00:03<00:01, 28.47it/s]\n",
      "Metadata Fetch Progress:  51%|█████     | 36/71 [00:03<00:01, 24.89it/s]\n",
      "Metadata Fetch Progress:  56%|█████▋    | 40/71 [00:03<00:01, 27.60it/s]\n",
      "Metadata Fetch Progress:  65%|██████▍   | 46/71 [00:03<00:00, 34.85it/s]\n",
      "Metadata Fetch Progress:  73%|███████▎  | 52/71 [00:03<00:00, 39.40it/s]\n",
      "Metadata Fetch Progress:  80%|████████  | 57/71 [00:03<00:00, 34.80it/s]\n",
      "Metadata Fetch Progress:  92%|█████████▏| 65/71 [00:03<00:00, 44.32it/s]\n",
      "Metadata Fetch Progress: 100%|██████████| 71/71 [00:04<00:00, 16.66it/s]\n",
      "Caught schedule exception\n",
      "2022-07-06 11:51:02,015\tINFO common.py:220 -- Exception from actor creation is ignored in destructor. To receive this exception in application code, call a method on the actor reference before its destructor is run.\n",
      "Read:  50%|█████     | 100/200 [00:08<00:05, 17.98it/s]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[2m\u001b[1m\u001b[36m(scheduler +18s)\u001b[0m Tip: use `ray status` to view detailed cluster status. To disable these messages, set RAY_SCHEDULER_EVENTS=0.\n",
      "\u001b[2m\u001b[1m\u001b[33m(scheduler +18s)\u001b[0m Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Read: 100%|██████████| 200/200 [00:24<00:00,  8.02it/s]\n",
      "Sort Sample: 100%|██████████| 200/200 [00:01<00:00, 140.76it/s]\n",
      "Shuffle Map:  12%|█▏        | 24/200 [00:11<02:10,  1.35it/s]\u001b[2m\u001b[36m(raylet)\u001b[0m Spilled 3172 MiB, 10 objects, write throughput 148 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.\n",
      "Shuffle Map:  16%|█▌        | 32/200 [00:24<02:36,  1.07it/s]\u001b[2m\u001b[36m(raylet)\u001b[0m Spilled 4230 MiB, 12 objects, write throughput 121 MiB/s.\n",
      "Shuffle Map:  25%|██▌       | 50/200 [00:33<01:18,  1.91it/s]\u001b[2m\u001b[36m(raylet, ip=172.31.26.111)\u001b[0m Spilled 9735 MiB, 287 objects, write throughput 227 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.\n",
      "Shuffle Map:  31%|███       | 62/200 [00:47<01:30,  1.52it/s]\u001b[2m\u001b[36m(raylet, ip=172.31.26.33)\u001b[0m Spilled 10652 MiB, 22 objects, write throughput 205 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.\n",
      "Shuffle Map:  36%|███▌      | 71/200 [00:51<00:36,  3.56it/s]The node with node id: ba27383322426fed6b6abc1ea7ec7647fcf168c22c6dbe1afbddc7e3 and address: 172.31.24.150 and node name: 172.31.24.150 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.\n",
      "Shuffle Map:  40%|████      | 81/200 [01:01<00:58,  2.02it/s]"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "\u001b[2m\u001b[1m\u001b[33m(scheduler +1m38s)\u001b[0m Warning: The following resource request cannot be scheduled right now: {'CPU': 1.0}. This is likely due to all cluster resources being claimed by actors. Consider creating fewer actors or adding more nodes to this Ray cluster.\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "Shuffle Map:  62%|██████▎   | 125/200 [01:19<00:22,  3.40it/s]\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m [2022-07-06 11:52:49,138 C 18262 18262] (raylet) node_manager.cc:170: This node has beem marked as dead.\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m *** StackTrace Information ***\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     ray::SpdLogMessage::Flush()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     ray::RayLog::~RayLog()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     ray::rpc::ClientCallImpl<>::OnReplyReceived()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     EventTracker::RecordExecution()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     boost::asio::detail::completion_handler<>::do_complete()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     boost::asio::detail::scheduler::do_run_one()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     boost::asio::detail::scheduler::run()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     boost::asio::io_context::run()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     main\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m     __libc_start_main\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.24.150)\u001b[0m \n",
      "Shuffle Map:  69%|██████▉   | 138/200 [01:21<00:09,  6.71it/s]\u001b[2m\u001b[36m(raylet)\u001b[0m Spilled 16559 MiB, 687 objects, write throughput 180 MiB/s.\n",
      "Shuffle Map:  80%|████████  | 161/200 [01:31<00:16,  2.32it/s]\u001b[2m\u001b[36m(raylet)\u001b[0m Spilled 16559 MiB, 691 objects, write throughput 180 MiB/s.\n",
      "Shuffle Map:  82%|████████▏ | 164/200 [01:33<00:17,  2.01it/s]\u001b[2m\u001b[36m(raylet, ip=172.31.26.111)\u001b[0m Spilled 9748 MiB, 288 objects, write throughput 227 MiB/s.\n",
      "Shuffle Map:  94%|█████████▍| 189/200 [02:00<00:14,  1.32s/it]\u001b[2m\u001b[36m(raylet, ip=172.31.26.33)\u001b[0m Spilled 10657 MiB, 23 objects, write throughput 205 MiB/s.\n",
      "Shuffle Map:  98%|█████████▊| 196/200 [02:52<00:25,  6.27s/it]\u001b[2m\u001b[36m(raylet, ip=172.31.26.111)\u001b[0m Spilled 13488 MiB, 289 objects, write throughput 283 MiB/s.\n",
      "Shuffle Map:  99%|█████████▉| 198/200 [03:02<00:10,  5.38s/it]\u001b[2m\u001b[36m(raylet, ip=172.31.26.33)\u001b[0m Spilled 15934 MiB, 24 objects, write throughput 252 MiB/s.\n",
      "\u001b[2m\u001b[36m(raylet, ip=172.31.26.111)\u001b[0m Spilled 16739 MiB, 290 objects, write throughput 323 MiB/s.\n",
      "Shuffle Map: 100%|██████████| 200/200 [04:41<00:00,  1.41s/it]\n",
      "Shuffle Reduce:   0%|          | 0/200 [00:00<?, ?it/s]\u001b[2m\u001b[36m(raylet, ip=172.31.26.33)\u001b[0m Spilled 16766 MiB, 161 objects, write throughput 262 MiB/s.\n",
      "Shuffle Reduce: 100%|██████████| 200/200 [00:09<00:00, 21.43it/s]\n"
     ]
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "{'product_category': 'Apparel', 'count()': 5906460}\n",
      "{'product_category': 'Automotive', 'count()': 3516476}\n",
      "{'product_category': 'Baby', 'count()': 1764893}\n",
      "{'product_category': 'Beauty', 'count()': 5115721}\n",
      "{'product_category': 'Books', 'count()': 20726160}\n",
      "{'product_category': 'Camera', 'count()': 1838760}\n",
      "{'product_category': 'Digital_Ebook_Purchase', 'count()': 19180765}\n",
      "{'product_category': 'Digital_Music_Purchase', 'count()': 1852184}\n",
      "{'product_category': 'Digital_Software', 'count()': 102084}\n",
      "{'product_category': 'Digital_Video_Download', 'count()': 5173743}\n",
      "{'product_category': 'Digital_Video_Games', 'count()': 145431}\n",
      "{'product_category': 'Electronics', 'count()': 3120938}\n",
      "{'product_category': 'Furniture', 'count()': 792214}\n",
      "{'product_category': 'Gift_Card', 'count()': 149086}\n",
      "{'product_category': 'Grocery', 'count()': 2402478}\n",
      "{'product_category': 'Health_&_Personal_Care', 'count()': 5332883}\n",
      "{'product_category': 'Home', 'count()': 6228567}\n",
      "{'product_category': 'Home_Entertainment', 'count()': 743700}\n",
      "{'product_category': 'Home_Improvement', 'count()': 2640654}\n",
      "{'product_category': 'Jewelry', 'count()': 1767753}\n",
      "CPU times: user 17.5 s, sys: 3.34 s, total: 20.9 s\n",
      "Wall time: 5min 24s\n"
     ]
    },
    {
     "name": "stderr",
     "output_type": "stream",
     "text": [
      "The node with node id: 5fe5a04368d8c94ac607757c58e4ed45324d2ae691b76808ee069e4a and address: 172.31.29.141 and node name: 172.31.29.141 has been marked dead because the detector has missed too many heartbeats from it. This can happen when a raylet crashes unexpectedly or has lagging heartbeats.\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m [2022-07-06 12:16:23,113 C 18274 18274] (raylet) node_manager.cc:170: This node has beem marked as dead.\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m *** StackTrace Information ***\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     ray::SpdLogMessage::Flush()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     ray::RayLog::~RayLog()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     ray::rpc::ClientCallImpl<>::OnReplyReceived()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     EventTracker::RecordExecution()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     std::_Function_handler<>::_M_invoke()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     boost::asio::detail::completion_handler<>::do_complete()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     boost::asio::detail::scheduler::do_run_one()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     boost::asio::detail::scheduler::run()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     boost::asio::io_context::run()\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     main\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m     __libc_start_main\n",
      "\u001b[2m\u001b[33m(raylet, ip=172.31.29.141)\u001b[0m \n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m Exception in thread ray_import_thread:\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m Traceback (most recent call last):\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/lib64/python3.7/threading.py\", line 926, in _bootstrap_inner\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     self.run()\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/lib64/python3.7/threading.py\", line 870, in run\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     self._target(*self._args, **self._kwargs)\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/local/lib/python3.7/site-packages/ray/_private/import_thread.py\", line 70, in _run\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     key = self.subscriber.poll()\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/local/lib/python3.7/site-packages/ray/_private/gcs_pubsub.py\", line 385, in poll\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     self._poll_locked(timeout=timeout)\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/local/lib/python3.7/site-packages/ray/_private/gcs_pubsub.py\", line 241, in _poll_locked\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     self._poll_request(), timeout=timeout\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/local/lib64/python3.7/site-packages/grpc/_channel.py\", line 976, in future\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     (operations,), event_handler, self._context)\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/local/lib64/python3.7/site-packages/grpc/_channel.py\", line 1306, in create\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     _run_channel_spin_thread(state)\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/local/lib64/python3.7/site-packages/grpc/_channel.py\", line 1270, in _run_channel_spin_thread\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     channel_spin_thread.start()\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"src/python/grpcio/grpc/_cython/_cygrpc/fork_posix.pyx.pxi\", line 117, in grpc._cython.cygrpc.ForkManagedThread.start\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m   File \"/usr/lib64/python3.7/threading.py\", line 852, in start\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m     _start_new_thread(self._bootstrap, ())\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m RuntimeError: can't start new thread\n",
      "\u001b[2m\u001b[36m(map pid=19106, ip=172.31.29.141)\u001b[0m \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,750 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,751 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,752 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n",
      "\u001b[2m\u001b[36m(pid=gcs_server)\u001b[0m [2022-07-06 12:16:23,753 E 29692 29692] (gcs_server) gcs_server.cc:283: Failed to get the resource load: GrpcUnavailable: RPC Error message: Socket closed; RPC Error details: \n"
     ]
    }
   ],
   "source": [
    "%%time\n",
    "\n",
    "def cast(t: pa.Table) -> pa.Table:\n",
    "    schema = t.schema\n",
    "    field_idx = schema.get_field_index(\"review_body\")\n",
    "    field = schema.field(field_idx)\n",
    "    new_field = field.with_type(pa.large_string())\n",
    "    new_schema = schema.set(field_idx, new_field)\n",
    "    return t.cast(new_schema)\n",
    "\n",
    "ds = ray.data.read_parquet(\"s3://dsoaws/parquet/\", _block_udf=cast)\n",
    "\n",
    "ds.groupby(\"product_category\").count().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "e2fc0b7c-c846-4424-86e6-77cf5f3e03e1",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "582cb35f-04d9-4e45-9a58-eea3d1b5b9ec",
   "metadata": {},
   "outputs": [],
   "source": []
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1db14292-8151-4fc1-ab18-f4997cee5cbd",
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
